home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) '''Abstract Apport user interface. This encapsulates the workflow and common code for any user interface implementation (like GTK, Qt, or CLI). Copyright (C) 2007 Canonical Ltd. Author: Martin Pitt <martin.pitt@ubuntu.com> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. See http://www.gnu.org/copyleft/gpl.html for the full text of the license. ''' import glob import sys import os.path as os import optparse import time import traceback import locale import gettext import re import pwd import errno import urllib import zlib import subprocess import threading import webbrowser from gettext import gettext as _ import apport import apport.fileutils as apport import REThread from apport.crashdb import get_crashdb def thread_collect_info(report, reportfile, package): '''Encapsulate call to add_*_info() and update given report, so that this function is suitable for threading. If reportfile is not None, the file is written back with the new data.''' report.add_gdb_info() if not package: if report.has_key('ExecutablePath'): package = apport.fileutils.find_file_package(report['ExecutablePath']) else: raise KeyError, 'called without a package, and report does not have ExecutablePath' report.has_key('ExecutablePath') report.add_package_info(package) report.add_os_info() report.add_hooks_info() title = report.standard_title() if title: report['Title'] = title if 'Package' not in report or not apport.packaging.is_distro_package(report['Package'].split()[0]): if 'APPORT_REPORT_THIRDPARTY' in os.environ or apport.fileutils.get_config('main', 'thirdparty', False, bool = True): report['ThirdParty'] = 'True' else: report['UnreportableReason'] = _('This is not a genuine %s package') % report['DistroRelease'].split()[0] if report['ProblemType'] == 'Crash' and 'APPORT_IGNORE_OBSOLETE_PACKAGES' not in os.environ: old_pkgs = report.obsolete_packages() if old_pkgs: report['UnreportableReason'] = _('You have some obsolete package versions installed. Please upgrade the following packages and check if the problem still occurs:\n\n%s') % ', '.join(old_pkgs) report.anonymize() if reportfile: f = open(reportfile, 'a') os.chmod(reportfile, 0) report.write(f, only_new = True) f.close() apport.fileutils.mark_report_seen(reportfile) os.chmod(reportfile, 384) class UserInterface: '''Abstract base class for encapsulating the workflow and common code for any user interface implementation (like GTK, Qt, or CLI). A concrete subclass must implement all the abstract ui_* methods.''' def __init__(self): '''Initialize program state and parse command line options.''' self.gettext_domain = 'apport' self.report = None self.report_file = None self.cur_package = None try: self.crashdb = get_crashdb(None) except ImportError: e = None print >>sys.stderr, 'Could not import module, is a package upgrade in progress? Error:', e sys.exit(1) gettext.textdomain(self.gettext_domain) self.parse_argv() def run_crashes(self): '''Present all currently pending crash reports to the user, ask him what to do about them, and offer to file bugs for them. Return True if at least one crash report was processed, False otherwise.''' result = False for f in apport.fileutils.get_new_reports(): self.run_crash(f) result = True return result def run_crash(self, report_file, confirm = True): '''Present given crash report to the user, ask him what to do about it, and offer to file a bug for it. If confirm is False, the user will not be asked whether to report the problem.''' self.report_file = report_file try: try: apport.fileutils.mark_report_seen(report_file) except OSError: pass if not self.load_report(report_file): return None if 'Ignore' in self.report: return None if self.report.get('ProblemType') == 'Crash' and 'Signal' in self.report and 'CoreDump' not in self.report and 'Stacktrace' not in self.report: subject = os.path.basename(self.report.get('ExecutablePath', _('unknown program'))) heading = _('Sorry, the program "%s" closed unexpectedly') % subject self.ui_error_message(_('Problem in %s') % subject, '%s\n\n%s' % (heading, _('Your computer does not have enough free memory to automatically analyze the problem and send a report to the developers.'))) return None if self.report.has_key('UnsupportableReason'): self.ui_info_message(_('Unreportable problem'), _('The current configuration cannot be supported:\n\n%s') % self.report['UnsupportableReason']) return None if not confirm: pass elif self.report.get('ProblemType') == 'Package': response = self.ui_present_package_error() if response == 'cancel': return None if not response == 'report': raise AssertionError elif self.report.get('ProblemType') == 'KernelCrash': response = self.ui_present_kernel_error() if response == 'cancel': return None if not response == 'report': raise AssertionError elif self.report.get('ProblemType') == 'KernelOops': response = self.ui_present_kernel_error() if response == 'cancel': return None if not response == 'report': raise AssertionError else: try: desktop_entry = self.get_desktop_entry() except ValueError: response == 'cancel' response == 'cancel' response == 'cancel' self.ui_error_message(_('Invalid problem report'), _('The report belongs to a package that is not installed.')) self.ui_shutdown() return None response == 'cancel' response = self.ui_present_crash(desktop_entry) if not response.has_key('action'): raise AssertionError if not response.has_key('blacklist'): raise AssertionError if response['action'] == 'cancel': return None if response['action'] == 'restart': self.restart() return None if not response['action'] == 'report': raise AssertionError try: self.collect_info() except (IOError, zlib.error): response['action'] == 'report' response['action'] == 'report' response['action'] == 'restart' self.report = None self.ui_error_message(_('Invalid problem report'), _('This problem report is damaged and cannot be processed.')) return False response['action'] == 'cancel' except ValueError: response == 'cancel' if response['blacklist'] else response.has_key('action') response == 'cancel' if response['blacklist'] else response.has_key('action') response == 'cancel' self.ui_error_message(_('Invalid problem report'), _('The report belongs to a package that is not installed.')) self.ui_shutdown() return None response == 'cancel' elif self.report.has_key('UnreportableReason'): self.ui_info_message(_('Problem in %s') % self.report['Package'].split()[0], _('The problem cannot be reported:\n\n%s') % self.report['UnreportableReason']) return None 'Stacktrace' not in self.report if self.handle_duplicate(): return None self.file_report() except IOError: e = None if e.errno in (errno.EPERM, errno.EACCES): self.ui_error_message(_('Invalid problem report'), _('You are not allowed to access this problem report.')) sys.exit(1) elif e.errno == errno.ENOSPC: self.ui_error_message(_('Error'), _('There is not enough disk space available to process this report.')) sys.exit(1) else: self.ui_error_message(_('Invalid problem report'), e.strerror) sys.exit(1) except OSError: e = None if e.errno == errno.ENOMEM: print >>sys.stderr, 'Out of memory, aborting' sys.exit(1) else: raise e.errno == errno.ENOMEM def run_report_bug(self): '''Report a bug. If a pid is given on the command line, the report will contain runtime debug information. Either a package or a pid must be specified.''' if not (self.options.package) and not (self.options.pid): self.ui_error_message(_('No package specified'), _('You need to specify a package or a PID. See --help for more information.')) return False self.report = apport.Report('Bug') try: if self.options.pid: self.report.add_proc_info(self.options.pid) else: self.report.add_proc_environ() except OSError: not (self.options.pid) e = not (self.options.pid) if e.errno == errno.ENOENT: return False if e.errno == errno.EACCES: self.ui_error_message(_('Permission denied'), _('The specified process does not belong to you. Please run this program as the process owner or as root.')) return False raise except: e.errno == errno.EACCES if self.options.package == 'linux': self.cur_package = apport.packaging.get_kernel_package() else: self.cur_package = self.options.package try: self.collect_info() except ValueError: e = None if str(e) == 'package does not exist': self.ui_error_message(_('Invalid problem report'), _('Package %s does not exist') % self.cur_package) return False raise except: str(e) == 'package does not exist' if not self.handle_duplicate(): try: del self.report['ProcCmdline'] except KeyError: str(e) == 'package does not exist' str(e) == 'package does not exist' except: str(e) == 'package does not exist' response = self.ui_present_report_details() if response != 'cancel': self.file_report() return True def run_argv(self): '''Call appopriate run_* method according to command line arguments. Return True if at least one report has been processed, and False otherwise.''' if self.options.filebug: return self.run_report_bug() if self.options.crash_file: try: self.run_crash(self.options.crash_file, False) except OSError: self.options.filebug e = self.options.filebug self.ui_error_message(_('Invalid problem report'), str(e)) except: self.options.filebug return True return self.run_crashes() def parse_argv(self): '''Parse command line options and return (options, args) tuple.''' optparser = optparse.OptionParser('%prog [options]') optparser.add_option('-f', '--file-bug', help = 'Start in bug filing mode. Requires --package and an optional --pid, or just a --pid', action = 'store_true', dest = 'filebug', default = False) optparser.add_option('-p', '--package', help = 'Specify package name in --file-bug mode. This is optional if a --pid is specified.', action = 'store', type = 'string', dest = 'package', default = None) optparser.add_option('-P', '--pid', help = 'Specify a running program in --file-bug mode. If this is specified, the bug report will contain more information.', action = 'store', type = 'int', dest = 'pid', default = None) optparser.add_option('-c', '--crash-file', help = 'Report the crash from given .crash file instead of the pending ones in ' + apport.fileutils.report_dir, action = 'store', type = 'string', dest = 'crash_file', default = None, metavar = 'PATH') (self.options, self.args) = optparser.parse_args() def format_filesize(self, size): """Format the given integer as humanly readable and i18n'ed file size.""" if size < 1048576: return locale.format('%.1f KiB', size / 1024) if size < 1073741824: return locale.format('%.1f MiB', size / 1.04858e+06) return locale.format('%.1f GiB', size / float(1073741824)) def get_complete_size(self): '''Return the size of the complete report.''' try: return self.complete_size except AttributeError: size = 0 for k in self.report: if self.report[k]: size += len(self.report[k]) continue return size def get_reduced_size(self): '''Return the size of the reduced report.''' size = 0 for k in self.report: if k != 'CoreDump': if self.report[k]: size += len(self.report[k]) self.report[k] return size def restart(self): '''Reopen the crashed application.''' if not self.report.has_key('ProcCmdline'): raise AssertionError if os.fork() == 0: os.setsid() os.execlp('sh', 'sh', '-c', self.report.get('RespawnCommand', self.report['ProcCmdline'])) sys.exit(1) def collect_info(self): '''Collect missing information about the report from the system and display a progress dialog in the meantime. In particular, this adds OS, package and gdb information and checks bug patterns.''' if not (self.cur_package) and not self.report.has_key('ExecutablePath'): self.report.add_os_info() elif self.report['ProblemType'] == 'Crash' and 'Stacktrace' in self.report: return None self.ui_start_info_collection_progress() if not self.report.has_key('Stacktrace'): icthread = REThread.REThread(target = thread_collect_info, name = 'thread_collect_info', args = (self.report, self.report_file, self.cur_package)) icthread.start() while icthread.isAlive(): self.ui_pulse_info_collection_progress() try: icthread.join(0.1) continue except KeyboardInterrupt: sys.exit(1) continue None<EXCEPTION MATCH>KeyboardInterrupt icthread.exc_raise() if self.report.has_key('CrashDB'): self.crashdb = get_crashdb(None, self.report['CrashDB']) if self.report['ProblemType'] == 'KernelCrash' and self.report['ProblemType'] == 'KernelOops' or self.report.has_key('Package'): bpthread = REThread.REThread(target = self.report.search_bug_patterns, args = (self.crashdb.get_bugpattern_baseurl(),)) bpthread.start() while bpthread.isAlive(): self.ui_pulse_info_collection_progress() try: bpthread.join(0.1) continue except KeyboardInterrupt: sys.exit(1) continue None<EXCEPTION MATCH>KeyboardInterrupt bpthread.exc_raise() if bpthread.return_value(): self.report['BugPatternURL'] = bpthread.return_value() self.ui_stop_info_collection_progress() if ('SourcePackage' not in self.report or not self.report['ProblemType'].startswith('Kernel')) and 'Package' not in self.report: self.ui_error_message(_('Invalid problem report'), _('Could not determine the package or source package name.')) self.ui_shutdown() sys.exit(1) def open_url(self, url): '''Open the given URL in a new browser window. Display an error dialog if everything fails.''' (r, w) = os.pipe() if os.fork() > 0: os.close(w) (pid, status) = os.wait() if status: title = _('Unable to start web browser') error = _('Unable to start web browser to open %s.' % url) message = os.fdopen(r).readline() if message: error += '\n' + message self.ui_error_message(title, error) try: os.close(r) except OSError: pass return None os.setsid() os.close(r) try: uid = int(os.getenv('SUDO_UID')) gid = int(os.getenv('SUDO_GID')) sudo_prefix = [ 'sudo', '-H', '-u', '#' + str(uid)] except TypeError: os.fork() > 0 os.fork() > 0 uid = os.getuid() gid = None sudo_prefix = [] except: os.fork() > 0 try: try: if os.getenv('DISPLAY') and subprocess.call([ 'pgrep', '-x', '-u', str(uid), 'ksmserver'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) == 0: subprocess.call(sudo_prefix + [ 'kfmclient', 'openURL', url]) sys.exit(0) except OSError: os.fork() > 0 os.fork() > 0 except: os.fork() > 0 try: if os.getenv('DISPLAY') and subprocess.call([ 'pgrep', '-x', '-u', str(uid), 'gnome-panel|gconfd-2'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) == 0: gct = subprocess.Popen(sudo_prefix + [ 'gconftool', '--get', '/desktop/gnome/url-handlers/http/command'], stdout = subprocess.PIPE, stderr = subprocess.PIPE) if gct.returncode == 0: preferred_browser = gct.communicate()[0] browser = re.match('((firefox|seamonkey|flock)[^\\s]*)', preferred_browser) if browser: subprocess.call(sudo_prefix + [ browser.group(0), '-new-window', url]) sys.exit(0) browser = re.match('(epiphany[^\\s]*)', preferred_browser) if browser: subprocess.call(sudo_prefix + [ browser.group(0), '--new-window', url]) sys.exit(0) if subprocess.call(sudo_prefix + [ 'gnome-open', url]) == 0: sys.exit(0) except OSError: os.fork() > 0 os.fork() > 0 except: os.fork() > 0 if uid and gid: os.setgroups([ gid]) os.setgid(gid) os.setuid(uid) os.unsetenv('SUDO_USER') os.environ['HOME'] = pwd.getpwuid(uid).pw_dir webbrowser.open(url, new = True, autoraise = True) sys.exit(0) except Exception: os.fork() > 0 e = os.fork() > 0 os.write(w, str(e)) sys.exit(1) except: os.fork() > 0 def file_report(self): '''Upload the current report to the tracking system and guide the user to its web page.''' global _UserInterface__upload_progress if self.report.get('PackageArchitecture') == self.report.get('Architecture'): try: del self.report['PackageArchitecture'] except KeyError: pass except: None<EXCEPTION MATCH>KeyError None<EXCEPTION MATCH>KeyError _UserInterface__upload_progress = None def progress_callback(sent, total): global _UserInterface__upload_progress _UserInterface__upload_progress = float(sent) / total self.ui_start_upload_progress() upthread = REThread.REThread(target = self.crashdb.upload, args = (self.report, progress_callback)) upthread.start() while upthread.isAlive(): self.ui_set_upload_progress(_UserInterface__upload_progress) try: upthread.join(0.1) continue except KeyboardInterrupt: sys.exit(1) continue None<EXCEPTION MATCH>KeyboardInterrupt if upthread.exc_info(): self.ui_error_message(_('Network problem'), '%s:\n\n%s' % (_('Could not upload report data to crash database'), str(upthread.exc_info()[1]))) return None ticket = upthread.return_value() self.ui_stop_upload_progress() url = self.crashdb.get_comment_url(self.report, ticket) if url: self.open_url(url) def load_report(self, path): '''Load report from given path and do some consistency checks. This might issue an error message and return False if the report cannot be processed, otherwise self.report is initialized and True is returned.''' try: self.report = apport.Report() self.report.load(open(path), binary = 'compressed') except MemoryError: self.report = None self.ui_error_message(_('Memory exhaustion'), _('Your system does not have enough memory to process this crash report.')) return False except IOError: e = None self.report = None self.ui_error_message(_('Invalid problem report'), e.strerror) return False except (TypeError, ValueError, zlib.error): self.report = None self.ui_error_message(_('Invalid problem report'), _('This problem report is damaged and cannot be processed.')) return False elif self.report.has_key('Package'): self.cur_package = self.report['Package'].split()[0] else: self.cur_package = apport.fileutils.find_file_package(self.report.get('ExecutablePath', '')) exe_path = self.report.get('InterpreterPath', self.report.get('ExecutablePath')) if (not (self.cur_package) and self.report['ProblemType'] != 'KernelCrash' or self.report['ProblemType'] != 'KernelOops' or exe_path) and not os.path.exists(exe_path): msg = _('This problem report does not apply to a packaged program.') if self.report.has_key('ExecutablePath'): msg = '%s (%s)' % (msg, self.report['ExecutablePath']) self.report = None self.ui_info_message(_('Invalid problem report'), msg) return False self.complete_size = os.path.getsize(path) return True def get_desktop_entry(self): '''Try to get a matching .desktop file entry (xdg.DesktopEntry) for the current self.report and return it.''' if self.report.has_key('DesktopFile') and os.path.exists(self.report['DesktopFile']): desktop_file = self.report['DesktopFile'] else: desktop_file = apport.fileutils.find_package_desktopfile(self.cur_package) if desktop_file: try: import xdg.DesktopEntry as xdg return xdg.DesktopEntry.DesktopEntry(desktop_file) return None def handle_duplicate(self): '''Check whether the current bug report is already known as a bug pattern, and if so, tell the user about it, open the existing bug, and return True.''' if not self.report.has_key('BugPatternURL'): return False self.ui_info_message(_('Problem already known'), _('This problem was already reported in the bug report displayed in the web browser. Please check if you can add any further information that might be helpful for the developers.')) self.open_url(self.report['BugPatternURL']) return True def ui_present_crash(self, desktopentry): """Inform that a crash has happened for self.report and self.cur_package and ask about an action. If the package can be mapped to a desktop file, an xdg.DesktopEntry is passed as an argument; this can be used for enhancing strings, etc. Return the action and options as a dictionary: - Valid values for the 'action' key: ignore the crash ('cancel'), restart the crashed application ('restart'), or report a bug about the crash ('report'). - Valid values for the 'blacklist' key: True or False (True will cause the invocation of report.mark_ignore()).""" raise NotImplementedError, 'this function must be overridden by subclasses' def ui_present_package_error(self, desktopentry): """Inform that a package installation/upgrade failure has happened for self.report and self.cur_package and ask about an action. Return the action: ignore ('cancel'), or report a bug about the problem ('report').""" raise NotImplementedError, 'this function must be overridden by subclasses' def ui_present_kernel_error(self, desktopentry): """Inform that a kernel crash has happened for self.report and ask about an action. Return the action: ignore ('cancel'), or report a bug about the problem ('report').""" raise NotImplementedError, 'this function must be overridden by subclasses' def ui_present_report_details(self): """Show details of the bug report and choose between sending a complete or reduced report. This function can use the get_complete_size() and get_reduced_size() methods to determine the respective size of the data to send, and format_filesize() to convert it to a humanly readable form. Return the action: send full report ('full'), send reduced report ('reduced'), or do not send anything ('cancel').""" raise NotImplementedError, 'this function must be overridden by subclasses' def ui_info_message(self, title, text): '''Show an information message box with given title and text.''' raise NotImplementedError, 'this function must be overridden by subclasses' def ui_error_message(self, title, text): '''Show an error message box with given title and text.''' raise NotImplementedError, 'this function must be overridden by subclasses' def ui_start_info_collection_progress(self): '''Open a window with an indefinite progress bar, telling the user to wait while debug information is being collected.''' raise NotImplementedError, 'this function must be overridden by subclasses' def ui_pulse_info_collection_progress(self): '''Advance the progress bar in the debug data collection progress window. This function is called every 100 ms.''' raise NotImplementedError, 'this function must be overridden by subclasses' def ui_stop_info_collection_progress(self): '''Close debug data collection progress window.''' raise NotImplementedError, 'this function must be overridden by subclasses' def ui_start_upload_progress(self): '''Open a window with an definite progress bar, telling the user to wait while debug information is being uploaded.''' raise NotImplementedError, 'this function must be overridden by subclasses' def ui_set_upload_progress(self, progress): '''Set the progress bar in the debug data upload progress window to the given ratio (between 0 and 1, or None for indefinite progress). This function is called every 100 ms.''' raise NotImplementedError, 'this function must be overridden by subclasses' def ui_stop_upload_progress(self): '''Close debug data upload progress window.''' raise NotImplementedError, 'this function must be overridden by subclasses' def ui_shutdown(self): '''This is called right before terminating the program and can be used for cleaning up.''' pass if __name__ == '__main__': import unittest import shutil import signal import tempfile from cStringIO import StringIO import problem_report class _TestSuiteUserInterface(UserInterface): '''Concrete UserInterface suitable for automatic testing.''' def __init__(self): self.crashdb_conf = tempfile.NamedTemporaryFile() print >>self.crashdb_conf, "default = 'testsuite'\ndatabases = {\n 'testsuite': { \n 'impl': 'memory',\n 'bug_pattern_base': None\n }\n}\n" self.crashdb_conf.flush() os.environ['APPORT_CRASHDB_CONF'] = self.crashdb_conf.name UserInterface.__init__(self) self.ic_progress_active = False self.ic_progress_pulses = 0 self.upload_progress_active = False self.upload_progress_pulses = 0 self.present_crash_response = None self.present_package_error_response = None self.present_kernel_error_response = None self.present_details_response = None self.opened_url = None self.clear_msg() def clear_msg(self): self.msg_title = None self.msg_text = None self.msg_severity = None def ui_present_crash(self, desktopentry): return self.present_crash_response def ui_present_package_error(self): return self.present_package_error_response def ui_present_kernel_error(self): return self.present_kernel_error_response def ui_present_report_details(self): return self.present_details_response def ui_info_message(self, title, text): self.msg_title = title self.msg_text = text self.msg_severity = 'info' def ui_error_message(self, title, text): self.msg_title = title self.msg_text = text self.msg_severity = 'error' def ui_start_info_collection_progress(self): self.ic_progress_pulses = 0 self.ic_progress_active = True def ui_pulse_info_collection_progress(self): if not self.ic_progress_active: raise AssertionError self.ic_progress_pulses += 1 def ui_stop_info_collection_progress(self): self.ic_progress_active = False def ui_start_upload_progress(self): self.upload_progress_pulses = 0 self.upload_progress_active = True def ui_set_upload_progress(self, progress): if not self.upload_progress_active: raise AssertionError self.upload_progress_pulses += 1 def ui_stop_upload_progress(self): self.upload_progress_active = False def open_url(self, url): self.opened_url = url class _UserInterfaceTest(unittest.TestCase): def setUp(self): for v in [ 'LANG', 'LANGUAGE', 'LC_MESSAGES', 'LC_ALL']: try: del os.environ[v] continue except KeyError: continue self.orig_report_dir = apport.fileutils.report_dir apport.fileutils.report_dir = tempfile.mkdtemp() self.orig_ignore_file = apport.report._ignore_file (fd, apport.report._ignore_file) = tempfile.mkstemp() os.close(fd) self.orig_argv = sys.argv sys.argv = [ 'ui-test'] self.ui = _TestSuiteUserInterface() self.report = apport.Report() self.report['Package'] = 'libfoo1 1-1' self.report['SourcePackage'] = 'foo' self.report['Foo'] = 'A' * 1000 self.report['CoreDump'] = 'A' * 100000 self.report_file = tempfile.NamedTemporaryFile() self.update_report_file() def update_report_file(self): self.report_file.seek(0) self.report_file.truncate() self.report.write(self.report_file) self.report_file.flush() def tearDown(self): sys.argv = self.orig_argv shutil.rmtree(apport.fileutils.report_dir) apport.fileutils.report_dir = self.orig_report_dir self.orig_report_dir = None os.unlink(apport.report._ignore_file) apport.report._ignore_file = self.orig_ignore_file self.ui = None self.report_file.close() self.assertEqual(subprocess.call([ 'pidof', '/bin/cat']), 1, 'no stray cats') self.assertEqual(subprocess.call([ 'pidof', '/bin/sleep']), 1, 'no stray sleeps') def test_format_filesize(self): '''format_filesize().''' self.assertEqual(self.ui.format_filesize(0), '0.0 KiB') self.assertEqual(self.ui.format_filesize(2048), '2.0 KiB') self.assertEqual(self.ui.format_filesize(2560), '2.5 KiB') self.assertEqual(self.ui.format_filesize(1000000), '976.6 KiB') self.assertEqual(self.ui.format_filesize(1048576), '1.0 MiB') self.assertEqual(self.ui.format_filesize(2.83116e+06), '2.7 MiB') self.assertEqual(self.ui.format_filesize(1073741824), '1.0 GiB') self.assertEqual(self.ui.format_filesize(0xA0000000L), '2.5 GiB') def test_get_size_loaded(self): '''get_complete_size() and get_reduced_size() for loaded Reports.''' self.ui.load_report(self.report_file.name) self.assertEqual(self.ui.get_complete_size(), os.path.getsize(self.report_file.name)) rs = self.ui.get_reduced_size() self.assert_(rs > 1000) self.assert_(rs < 10000) def test_get_size_constructed(self): '''get_complete_size() and get_reduced_size() for on-the-fly Reports.''' self.ui.report = apport.Report('Bug') self.ui.report['Hello'] = 'World' s = self.ui.get_complete_size() self.assert_(s > 5) self.assert_(s < 100) self.assertEqual(s, self.ui.get_reduced_size()) def test_load_report(self): '''load_report().''' self.ui.load_report(self.report_file.name) self.assertEqual(self.ui.report, self.report) self.assertEqual(self.ui.msg_title, None) del self.report['Package'] del self.report['SourcePackage'] self.update_report_file() self.ui.load_report(self.report_file.name) self.assert_(self.ui.report == None) self.assertEqual(self.ui.msg_title, _('Invalid problem report')) self.assertEqual(self.ui.msg_severity, 'info') self.ui.clear_msg() self.report_file.seek(0) self.report_file.truncate() self.report_file.write('Type: test\nPackage: foo 1-1\nCoreDump: base64\n bOgUs=\n') self.report_file.flush() self.ui.load_report(self.report_file.name) self.assert_(self.ui.report == None) self.assertEqual(self.ui.msg_title, _('Invalid problem report')) self.assertEqual(self.ui.msg_severity, 'error') def test_restart(self): '''restart().''' p = os.path.join(apport.fileutils.report_dir, 'ProcCmdline') r = os.path.join(apport.fileutils.report_dir, 'Custom') self.report['ProcCmdline'] = 'touch ' + p self.update_report_file() self.ui.load_report(self.report_file.name) self.ui.restart() time.sleep(1) self.assert_(os.path.exists(p)) self.assert_(not os.path.exists(r)) os.unlink(p) self.report['RespawnCommand'] = 'touch ' + r self.update_report_file() self.ui.load_report(self.report_file.name) self.ui.restart() time.sleep(1) self.assert_(not os.path.exists(p)) self.assert_(os.path.exists(r)) os.unlink(r) del self.report['RespawnCommand'] self.report['ProcCmdline'] = '/nonexisting' self.update_report_file() self.ui.load_report(self.report_file.name) def test_collect_info_distro(self): '''collect_info() on report without information (distro bug).''' self.ui.report = apport.Report() self.ui.collect_info() self.assert_(set([ 'Date', 'Uname', 'DistroRelease', 'ProblemType']).issubset(set(self.ui.report.keys()))) self.assertEqual(self.ui.ic_progress_pulses, 0, 'no progress dialog for distro bug info collection') def test_collect_info_exepath(self): '''collect_info() on report with only ExecutablePath.''' self.report = apport.Report() self.report['ExecutablePath'] = '/bin/bash' self.update_report_file() self.ui.load_report(self.report_file.name) self.ui.report['Fstab'] = ('/etc/fstab', True) self.ui.report['CompressedValue'] = problem_report.CompressedValue('Test') self.ui.collect_info() self.assert_(set([ 'SourcePackage', 'Package', 'ProblemType', 'Uname', 'Dependencies', 'DistroRelease', 'Date', 'ExecutablePath']).issubset(set(self.ui.report.keys()))) self.assert_(self.ui.ic_progress_pulses > 0, 'progress dialog for package bug info collection') self.assertEqual(self.ui.ic_progress_active, False, 'progress dialog for package bug info collection finished') def test_collect_info_package(self): '''collect_info() on report with a package.''' self.ui.report = apport.Report() self.ui.cur_package = 'bash' self.ui.collect_info() self.assert_(set([ 'SourcePackage', 'Package', 'ProblemType', 'Uname', 'Dependencies', 'DistroRelease', 'Date']).issubset(set(self.ui.report.keys()))) self.assert_(self.ui.ic_progress_pulses > 0, 'progress dialog for package bug info collection') self.assertEqual(self.ui.ic_progress_active, False, 'progress dialog for package bug info collection finished') def test_handle_duplicate(self): '''handle_duplicate().''' self.ui.load_report(self.report_file.name) self.assertEqual(self.ui.handle_duplicate(), False) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, None) demo_url = 'http://example.com/1' self.report['BugPatternURL'] = demo_url self.update_report_file() self.ui.load_report(self.report_file.name) self.assertEqual(self.ui.handle_duplicate(), True) self.assertEqual(self.ui.msg_severity, 'info') self.assertEqual(self.ui.opened_url, demo_url) def test_run_nopending(self): '''running the frontend without any pending reports.''' sys.argv = [] self.ui = _TestSuiteUserInterface() self.assertEqual(self.ui.run_argv(), False) def test_run_report_bug_noargs(self): '''run_report_bug() without specifying arguments.''' sys.argv = [ 'ui-test', '-f'] self.ui = _TestSuiteUserInterface() self.assertEqual(self.ui.run_argv(), False) self.assertEqual(self.ui.msg_severity, 'error') def test_run_report_bug_package(self): '''run_report_bug() for a package.''' sys.argv = [ 'ui-test', '-f', '-p', 'bash'] self.ui = _TestSuiteUserInterface() self.assertEqual(self.ui.run_argv(), True) self.assertEqual(self.ui.msg_severity, None) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, 'http://bash.bugs.example.com/%i' % self.ui.crashdb.latest_id()) self.assert_(self.ui.ic_progress_pulses > 0) self.assertEqual(self.ui.report['SourcePackage'], 'bash') self.assert_('Dependencies' in self.ui.report.keys()) self.assert_('ProcEnviron' in self.ui.report.keys()) self.assertEqual(self.ui.report['ProblemType'], 'Bug') sys.argv = [ 'ui-test', '-f', '-p', 'nonexisting_gibberish'] self.ui = _TestSuiteUserInterface() self.ui.run_argv() self.assertEqual(self.ui.msg_severity, 'error') def test_run_report_bug_pid(self): '''run_report_bug() for a pid.''' pid = os.fork() if pid == 0: os.execv('/bin/sleep', [ 'sleep', '10000']) if not False: raise AssertionError, 'Could not execute /bin/sleep' time.sleep(0.5) try: sys.argv = [ 'ui-test', '-f', '-P', str(pid)] self.ui = _TestSuiteUserInterface() self.assertEqual(self.ui.run_argv(), True) finally: os.kill(pid, signal.SIGKILL) os.waitpid(pid, 0) self.assert_('SourcePackage' in self.ui.report.keys()) self.assert_('Dependencies' in self.ui.report.keys()) self.assert_('ProcMaps' in self.ui.report.keys()) self.assertEqual(self.ui.report['ExecutablePath'], '/bin/sleep') self.failIf(self.ui.report.has_key('ProcCmdline')) self.assert_('ProcEnviron' in self.ui.report.keys()) self.assertEqual(self.ui.report['ProblemType'], 'Bug') self.assertEqual(self.ui.msg_severity, None) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, 'http://coreutils.bugs.example.com/%i' % self.ui.crashdb.latest_id()) self.assert_(self.ui.ic_progress_pulses > 0) def test_run_report_bug_wrong_pid(self): '''run_report_bug() for a nonexisting pid.''' pid = 1 while True: pid += 1 try: os.kill(pid, 0) continue except OSError: e = None if e.errno == errno.ESRCH: break e.errno == errno.ESRCH None<EXCEPTION MATCH>OSError sys.argv = [ 'ui-test', '-f', '-P', str(pid)] self.ui = _TestSuiteUserInterface() self.ui.run_argv() def test_run_report_bug_noperm_pid(self): '''run_report_bug() for a pid which runs as a different user.''' if not os.getuid() > 0: raise AssertionError, 'this test must not be run as root' sys.argv = [ 'ui-test', '-f', '-P', '1'] self.ui = _TestSuiteUserInterface() self.ui.run_argv() self.assertEqual(self.ui.msg_severity, 'error') def test_run_report_bug_unpackaged_pid(self): '''run_report_bug() for a pid of an unpackaged program.''' (fd, exename) = tempfile.mkstemp() os.write(fd, open('/bin/cat').read()) os.close(fd) os.chmod(exename, 493) pid = os.fork() if pid == 0: os.execv(exename, [ exename]) try: sys.argv = [ 'ui-test', '-f', '-P', str(pid)] self.ui = _TestSuiteUserInterface() self.assertRaises(SystemExit, self.ui.run_argv) finally: os.kill(pid, signal.SIGKILL) os.wait() os.unlink(exename) self.assertEqual(self.ui.msg_severity, 'error') def _gen_test_crash(self): '''Generate a Report with real crash data.''' test_executable = '/bin/cat' if not os.access(test_executable, os.X_OK): raise AssertionError, test_executable + ' is not executable' pid = os.fork() try: time.sleep(0.5) coredump = os.path.join(apport.fileutils.report_dir, 'core') if not subprocess.call([ 'gdb', '--batch', '--ex', 'generate-core-file ' + coredump, test_executable, str(pid)], stdout = subprocess.PIPE, stderr = subprocess.PIPE) == 0: raise AssertionError r = apport.Report() r['ExecutablePath'] = test_executable r['CoreDump'] = (coredump,) r['Signal'] = '11' r.add_proc_info(pid) r.add_user_info() finally: os.kill(pid, signal.SIGKILL) os.waitpid(pid, 0) return r def test_run_crash(self): '''run_crash().''' r = self._gen_test_crash() report_file = os.path.join(apport.fileutils.report_dir, 'test.crash') r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_crash_response = { 'action': 'cancel', 'blacklist': False } self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, None) self.assertEqual(self.ui.ic_progress_pulses, 0) r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_crash_response = { 'action': 'report', 'blacklist': False } self.ui.present_details_response = 'cancel' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None, 'has %s message: %s: %s' % (self.ui.msg_severity, str(self.ui.msg_title), str(self.ui.msg_text))) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, None) self.assertNotEqual(self.ui.ic_progress_pulses, 0) r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_crash_response = { 'action': 'report', 'blacklist': False } self.ui.present_details_response = 'full' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, 'http://coreutils.bugs.example.com/%i' % self.ui.crashdb.latest_id()) self.assertNotEqual(self.ui.ic_progress_pulses, 0) self.assert_('SourcePackage' in self.ui.report.keys()) self.assert_('Dependencies' in self.ui.report.keys()) self.assert_('Stacktrace' in self.ui.report.keys()) self.assert_('ProcEnviron' in self.ui.report.keys()) self.assertEqual(self.ui.report['ProblemType'], 'Crash') self.assert_(len(self.ui.report['CoreDump']) > 10000) self.assert_(self.ui.report['Title'].startswith('cat crashed with SIGSEGV')) r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_crash_response = { 'action': 'report', 'blacklist': False } self.ui.present_details_response = 'reduced' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, 'http://coreutils.bugs.example.com/%i' % self.ui.crashdb.latest_id()) self.assertNotEqual(self.ui.ic_progress_pulses, 0) self.assert_('SourcePackage' in self.ui.report.keys()) self.assert_('Dependencies' in self.ui.report.keys()) self.assert_('Stacktrace' in self.ui.report.keys()) self.assertEqual(self.ui.report['ProblemType'], 'Crash') self.assert_(not self.ui.report.has_key('CoreDump')) self.assert_(not self.ui.report.check_ignored()) r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_crash_response = { 'action': 'cancel', 'blacklist': True } self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, None) self.assertEqual(self.ui.ic_progress_pulses, 0) self.assert_(self.ui.report.check_ignored()) def test_run_crash_argv_file(self): '''run_crash() through a file specified on the command line.''' self.report['Package'] = 'bash' self.report['UnsupportableReason'] = 'It stinks.' self.update_report_file() sys.argv = [ 'ui-test', '-c', self.report_file.name] self.ui = _TestSuiteUserInterface() self.assertEqual(self.ui.run_argv(), True) self.assert_('It stinks.' in self.ui.msg_text, '%s: %s' % (self.ui.msg_title, self.ui.msg_text)) self.assertEqual(self.ui.msg_severity, 'info') sys.argv = [ 'ui-test', '-c', '/nonexisting.crash'] self.ui = _TestSuiteUserInterface() self.assertEqual(self.ui.run_argv(), True) self.assertEqual(self.ui.msg_severity, 'error') def test_run_crash_unsupportable(self): '''run_crash() on a crash with the UnsupportableReason field.''' self.report['UnsupportableReason'] = 'It stinks.' self.report['Package'] = 'bash' self.update_report_file() self.ui.run_crash(self.report_file.name) self.assert_('It stinks.' in self.ui.msg_text, '%s: %s' % (self.ui.msg_title, self.ui.msg_text)) self.assertEqual(self.ui.msg_severity, 'info') def test_run_crash_unreportable(self): '''run_crash() on a crash with the UnreportableReason field.''' self.report['UnreportableReason'] = 'It stinks.' self.report['ExecutablePath'] = '/bin/bash' self.report['Package'] = 'bash 1' self.update_report_file() self.ui.present_crash_response = { 'action': 'report', 'blacklist': False } self.ui.present_details_response = 'full' self.ui.run_crash(self.report_file.name) self.assert_('It stinks.' in self.ui.msg_text, '%s: %s' % (self.ui.msg_title, self.ui.msg_text)) self.assertEqual(self.ui.msg_severity, 'info') def test_run_crash_ignore(self): '''run_crash() on a crash with the Ignore field.''' self.report['Ignore'] = 'True' self.report['ExecutablePath'] = '/bin/bash' self.report['Package'] = 'bash 1' self.update_report_file() self.ui.run_crash(self.report_file.name) self.assertEqual(self.ui.msg_severity, None) def test_run_crash_nocore(self): '''run_crash() for a crash dump without CoreDump.''' test_executable = '/bin/cat' if not os.access(test_executable, os.X_OK): raise AssertionError, test_executable + ' is not executable' pid = os.fork() try: time.sleep(0.5) r = apport.Report() r['ExecutablePath'] = test_executable r['Signal'] = '42' r.add_proc_info(pid) r.add_user_info() finally: os.kill(pid, signal.SIGKILL) os.waitpid(pid, 0) report_file = os.path.join(apport.fileutils.report_dir, 'test.crash') r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, 'error') self.assert_('memory' in self.ui.msg_text, '%s: %s' % (self.ui.msg_title, self.ui.msg_text)) def test_run_crash_preretraced(self): '''run_crash() pre-retraced reports. This happens with crashes which are pre-processed by apport-retrace.''' r = self._gen_test_crash() r.add_gdb_info() del r['CoreDump'] report_file = os.path.join(apport.fileutils.report_dir, 'test.crash') r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_crash_response = { 'action': 'report', 'blacklist': False } self.ui.present_details_response = 'cancel' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None, 'has %s message: %s: %s' % (self.ui.msg_severity, str(self.ui.msg_title), str(self.ui.msg_text))) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, None) self.assertEqual(self.ui.ic_progress_pulses, 0) def test_run_crash_errors(self): '''run_crash() on various error conditions.''' r = apport.Report() r['ExecutablePath'] = '/bin/bash' r['Package'] = 'foobarbaz' r['SourcePackage'] = 'foobarbaz' report_file = os.path.join(apport.fileutils.report_dir, 'test.crash') r.write(open(report_file, 'w')) self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_title, _('Invalid problem report')) self.assertEqual(self.ui.msg_severity, 'error') def test_run_crash_uninstalled(self): '''run_crash() on reports with subsequently uninstalled packages''' r = self._gen_test_crash() r['ExecutablePath'] = '/bin/nonexisting' r['Package'] = 'bash' report_file = os.path.join(apport.fileutils.report_dir, 'test.crash') r.write(open(report_file, 'w')) self.ui.present_crash_response = { 'action': 'report', 'blacklist': False } self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_title, _('Invalid problem report')) self.assertEqual(self.ui.msg_severity, 'info') r = apport.Report() r['ExecutablePath'] = '/bin/nonexisting' r['InterpreterPath'] = '/usr/bin/python' r['Traceback'] = 'ZeroDivisionError: integer division or modulo by zero' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_title, _('Invalid problem report')) self.assertEqual(self.ui.msg_severity, 'info') r = apport.Report() r['ExecutablePath'] = '/bin/sh' r['InterpreterPath'] = '/usr/bin/nonexisting' r['Traceback'] = 'ZeroDivisionError: integer division or modulo by zero' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_title, _('Invalid problem report')) self.assertEqual(self.ui.msg_severity, 'info') def test_run_crash_package(self): '''run_crash() for a package error.''' r = apport.Report('Package') r['Package'] = 'bash' r['SourcePackage'] = 'bash' r['ErrorMessage'] = 'It broke' r['VarLogPackagerlog'] = 'foo\nbar' r.add_os_info() report_file = os.path.join(apport.fileutils.report_dir, 'test.crash') r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_package_error_response = 'cancel' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, None) self.assertEqual(self.ui.ic_progress_pulses, 0) r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_package_error_response = 'report' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, 'http://bash.bugs.example.com/%i' % self.ui.crashdb.latest_id()) self.assert_('SourcePackage' in self.ui.report.keys()) self.assert_('Package' in self.ui.report.keys()) self.assertEqual(self.ui.report['ProblemType'], 'Package') self.assert_('Architecture' in self.ui.report.keys()) self.assert_('DistroRelease' in self.ui.report.keys()) self.assert_('Uname' in self.ui.report.keys()) def test_run_crash_kernel(self): '''run_crash() for a kernel error.''' r = apport.Report('KernelCrash') r['Package'] = apport.packaging.get_kernel_package() r['SourcePackage'] = 'linux' report_file = os.path.join(apport.fileutils.report_dir, 'test.crash') r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_kernel_error_response = 'cancel' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None, 'error: %s - %s' % (self.ui.msg_title, self.ui.msg_text)) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, None) self.assertEqual(self.ui.ic_progress_pulses, 0) r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_kernel_error_response = 'report' self.ui.present_details_response = 'full' self.ui.run_crash(report_file) self.assertEqual(self.ui.msg_severity, None, str(self.ui.msg_title) + ' ' + str(self.ui.msg_text)) self.assertEqual(self.ui.msg_title, None) self.assertEqual(self.ui.opened_url, 'http://linux.bugs.example.com/%i' % self.ui.crashdb.latest_id()) self.assert_('SourcePackage' in self.ui.report.keys()) self.assert_('ProcModules' in self.ui.report.keys()) self.assert_('Lspci' in self.ui.report.keys()) self.assertEqual(self.ui.report['ProblemType'], 'KernelCrash') def test_run_crash_anonymity(self): '''run_crash() anonymization.''' r = self._gen_test_crash() report_file = os.path.join(apport.fileutils.report_dir, 'test.crash') r.write(open(report_file, 'w')) self.ui = _TestSuiteUserInterface() self.ui.present_crash_response = { 'action': 'report', 'blacklist': False } self.ui.present_details_response = 'cancel' self.ui.run_crash(report_file) self.failIf('ProcCwd' in self.ui.report) dump = StringIO() self.ui.report.write(dump) p = pwd.getpwuid(os.getuid()) bad_strings = [ os.uname()[1], p[0], p[4], p[5], os.getcwd()] for s in bad_strings: self.failIf(s in dump.getvalue(), 'dump contains sensitive string: %s' % s) unittest.main()